package com.derbysoft.nuke.kafka.manager.infrastructure.jmx;

import com.derbysoft.nuke.kafka.manager.domain.model.Broker;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Service
@Slf4j
public class JMXServiceImpl implements JMXService {

    private ConcurrentMap<String, JMXConnector> jmxConnectors = Maps.newConcurrentMap();

    @Override
    public Map<String, Object> getBrokerTopicMetrics(Broker broker) {
        List<String> metricNames = Lists.newArrayList(
                "MessagesInPerSec",
                "BytesInPerSec",
                "BytesOutPerSec",
                "BytesRejectedPerSec",
                "FailedFetchRequestsPerSec",
                "FailedProduceRequestsPerSec",
                "TotalFetchRequestsPerSec",
                "TotalProduceRequestsPerSec");
        return metricNames.stream().collect(Collectors.toMap(Function.identity(), metricName -> this.getBrokerTopicMetrics(broker, metricName)));
    }

    private Map<String, Object> getBrokerTopicMetrics(Broker broker, String metricsName) {
        try {
            JMXConnector connector = getJMXConnector(broker);
            String jmxName = String.format("kafka.server:type=BrokerTopicMetrics,name=%s", metricsName);
            return getReadableAttributes(connector, jmxName);
        } catch (Exception e) {
            logError("getSocketServerMetrics", broker);
        }
        return Maps.newHashMap();
    }

    @Override
    public Map<String, Object> getSocketServerMetrics(Broker broker) {
        try {
            JMXConnector connector = getJMXConnector(broker);
            String jmxName = String.format("kafka.server:type=socket-server-metrics,networkProcessor=0", broker.getId()); // TODO 是否永远取0
            return getReadableAttributes(connector, jmxName);
        } catch (Exception e) {
            logError("getSocketServerMetrics", broker);
        }
        return Maps.newHashMap();
    }

    @Override
    public Map<String, Object> getOSMetrics(Broker broker) {
        List<String> metricNames = Lists.newArrayList(
                "OperatingSystem",
                "Runtime",
                "Threading",
                "Memory"
        );
        Map<String, Object> objectMap = metricNames.stream().collect(Collectors.toMap(Function.identity(), metricName -> this.getJVM(broker, metricName)));
        Map<String, Object> runtimeMetrics = (Map<String, Object>) objectMap.get("Runtime");
        runtimeMetrics.remove("SystemProperties"); // too large
        return objectMap;
    }

    private Map<String, Object> getJVM(Broker broker, String type) {
        try {
            JMXConnector connector = getJMXConnector(broker);
            String jmxName = String.format("java.lang:type=%s", type);
            return getReadableAttributes(connector, jmxName);
        } catch (Exception e) {
            logError("get jvm info of " + type, broker);
        }
        return Maps.newHashMap();
    }

    private Map<String, Object> getReadableAttributes(JMXConnector connector, String jmxName) throws Exception {
        ObjectName objectName = new ObjectName(jmxName);
        MBeanServerConnection connection = connector.getMBeanServerConnection();
        MBeanInfo mBeanInfo = connection.getMBeanInfo(objectName);
        MBeanAttributeInfo[] attributes = mBeanInfo.getAttributes();
        List<String> readableAttributes = Stream.of(attributes)
                .filter(MBeanAttributeInfo::isReadable)
                .filter(attr -> !"ObjectName".equals(attr.getName()))
                .map(MBeanFeatureInfo::getName)
                .collect(Collectors.toList());
        AttributeList attributeList = connection.getAttributes(objectName, readableAttributes.toArray(new String[0]));
        return attributeList.asList().stream().collect(Collectors.toMap(Attribute::getName, Attribute::getValue));
    }

    private JMXConnector getJMXConnector(Broker broker) {
        return getJMXConnector(String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", broker.getHost(), broker.getJmxPort()));
    }

    private JMXConnector createJMXConnector(String url) throws IOException {
        JMXServiceURL serviceURL = new JMXServiceURL(url);
        return JMXConnectorFactory.connect(serviceURL, null);
    }

    private void logError(String message, Broker broker) {
        log.error("{} error, broker {}:{}:{}", message, broker.getId(), broker.getHost(), broker.getPort());
    }

    private JMXConnector getJMXConnector(String url) {
        if (!jmxConnectors.containsKey(url)) {
            synchronized (this) {
                if (!jmxConnectors.containsKey(url)) {
                    try {
                        JMXConnector connector = createJMXConnector(url);
                        jmxConnectors.put(url, connector);
                    } catch (IOException e) {
                        log.error("create jmx connector fail of url[" + url + "]", e);
                    }
                }
            }
        }
        return jmxConnectors.get(url);
    }

}
